import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.*;



public class TableFunctions {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Order> ds1 = env.fromCollection(Arrays.asList(
                new Order(1L, "beer#apple", 3),
                new Order(3L, "rubber#pear", 2),
                new Order(1L, "diaper#orange", 4)
        ));


        tEnv.createTemporaryView("MyTable",ds1);

// call function "inline" without registration in Table API
        Table result1=tEnv.from("MyTable")
                .joinLateral(call(SplitFunction.class, $("product")))
                .select($("product"), $("word"), $("length"));

        Table result2=tEnv.from("MyTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("product")))
                .select($("product"), $("word"), $("length"));

// rename fields of the function in Table API
        Table result3=tEnv.from("MyTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("product")).as("newWord", "newLength"))
                .select($("product"), $("newWord"), $("newLength"));


//-------------------------------------------------------------------------------------------------------
// register function
        tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

// call registered function in Table API
        Table result4=
                tEnv.from("MyTable")
                        .joinLateral(call("SplitFunction", $("product")))
                        .select($("product"), $("word"), $("length"));




        Table result5= tEnv.from("MyTable")
                .leftOuterJoinLateral(call("SplitFunction", $("product")))
                .select($("product"), $("word"), $("length"));




// call registered function in SQL
        Table result6=tEnv.sqlQuery(
                "SELECT product, word, length " +
                        "FROM MyTable, LATERAL TABLE(SplitFunction(product))");




        Table result7=tEnv.sqlQuery(
                "SELECT product, word, length " +
                        "FROM MyTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(product)) ON TRUE");
        tEnv.toAppendStream(result4, Row.class).print();



// rename fields of the function in SQL
        Table result8=tEnv.sqlQuery(
                "SELECT product, newWord, newLength " +
                        "FROM MyTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(product)) AS T(newWord, newLength) ON TRUE");



        tEnv.toAppendStream(result1, Row.class).print();
        tEnv.toAppendStream(result2, Row.class).print();
        tEnv.toAppendStream(result3, Row.class).print();
        tEnv.toAppendStream(result4, Row.class).print();
        tEnv.toAppendStream(result5, Row.class).print();
        tEnv.toAppendStream(result6, Row.class).print();
        tEnv.toAppendStream(result7, Row.class).print();
        tEnv.toAppendStream(result8, Row.class).print();



        env.execute();
    }


}